[[seek]]
= Seeking to a Specific Offset

In order to seek, your listener must implement `ConsumerSeekAware`, which has the following methods:

[source, java]
----
void registerSeekCallback(ConsumerSeekCallback callback);

void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback);

void onPartitionsRevoked(Collection<TopicPartition> partitions);

void onIdleContainer(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback);
----

The `registerSeekCallback` is called when the container is started and whenever partitions are assigned.
You should use this callback when seeking at some arbitrary time after initialization.
You should save a reference to the callback.
If you use the same listener in multiple containers (or in a `ConcurrentMessageListenerContainer`), you should store the callback in a `ThreadLocal` or some other structure keyed by the listener `Thread`.

When using group management, `onPartitionsAssigned` is called when partitions are assigned.
You can use this method, for example, for setting initial offsets for the partitions, by calling the callback.
You can also use this method to associate this thread's callback with the assigned partitions (see the example below).
You must use the callback argument, not the one passed into `registerSeekCallback`.
Starting with version 2.5.5, this method is called, even when using xref:kafka/receiving-messages/listener-annotation.adoc#manual-assignment[manual partition assignment].

`onPartitionsRevoked` is called when the container is stopped or Kafka revokes assignments.
You should discard this thread's callback and remove any associations to the revoked partitions.

The callback has the following methods:

[source, java]
----
void seek(String topic, int partition, long offset);

void seek(String topic, int partition, Function<Long, Long> offsetComputeFunction);

void seekToBeginning(String topic, int partition);

void seekToBeginning(Collection<TopicPartitions> partitions);

void seekToEnd(String topic, int partition);

void seekToEnd(Collection<TopicPartitions> partitions);

void seekRelative(String topic, int partition, long offset, boolean toCurrent);

void seekToTimestamp(String topic, int partition, long timestamp);

void seekToTimestamp(Collection<TopicPartition> topicPartitions, long timestamp);

String getGroupId();
----

The two different variants of the `seek` methods provide a way to seek to an arbitrary offset.
The method that takes a `Function` as an argument to compute the offset was added in version 3.2 of the framework.
This function provides access to the current offset (the current position returned by the consumer, which is the next offset to be fetched).
The user can decide what offset to seek to based on the current offset in the consumer as part of the function definition.

`seekRelative` was added in version 2.3, to perform relative seeks.

* `offset` negative and `toCurrent` `false` - seek relative to the end of the partition.
* `offset` positive and `toCurrent` `false` - seek relative to the beginning of the partition.
* `offset` negative and `toCurrent` `true` - seek relative to the current position (rewind).
* `offset` positive and `toCurrent` `true` - seek relative to the current position (fast forward).

The `seekToTimestamp` methods were also added in version 2.3.

NOTE: When seeking to the same timestamp for multiple partitions in the `onIdleContainer` or `onPartitionsAssigned` methods, the second method is preferred because it is more efficient to find the offsets for the timestamps in a single call to the consumer's `offsetsForTimes` method.
When called from other locations, the container will gather all timestamp seek requests and make one call to `offsetsForTimes`.

You can also perform seek operations from `onIdleContainer()` when an idle container is detected.
See xref:kafka/events.adoc#idle-containers[Detecting Idle and Non-Responsive Consumers] for how to enable idle container detection.

NOTE: The `seekToBeginning` method that accepts a collection is useful, for example, when processing a compacted topic and you wish to seek to the beginning every time the application is started:

[source, java]
----
public class MyListener implements ConsumerSeekAware {

    ...

    @Override
    public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
        callback.seekToBeginning(assignments.keySet());
    }

}
----

To arbitrarily seek at runtime, use the callback reference from the `registerSeekCallback` for the appropriate thread.

Here is a trivial Spring Boot application that demonstrates how to use the callback; it sends 10 records to the topic; hitting `<Enter>` in the console causes all partitions to seek to the beginning.

[source, java]
----
@SpringBootApplication
public class SeekExampleApplication {

    public static void main(String[] args) {
        SpringApplication.run(SeekExampleApplication.class, args);
    }

    @Bean
    public ApplicationRunner runner(Listener listener, KafkaTemplate<String, String> template) {
        return args -> {
            IntStream.range(0, 10).forEach(i -> template.send(
                new ProducerRecord<>("seekExample", i % 3, "foo", "bar")));
            while (true) {
                System.in.read();
                listener.seekToStart();
            }
        };
    }

    @Bean
    public NewTopic topic() {
        return new NewTopic("seekExample", 3, (short) 1);
    }

}

@Component
class Listener implements ConsumerSeekAware {

    private static final Logger logger = LoggerFactory.getLogger(Listener.class);

    private final ThreadLocal<ConsumerSeekCallback> callbackForThread = new ThreadLocal<>();

    private final Map<TopicPartition, ConsumerSeekCallback> callbacks = new ConcurrentHashMap<>();

    @Override
    public void registerSeekCallback(ConsumerSeekCallback callback) {
        this.callbackForThread.set(callback);
    }

    @Override
    public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
        assignments.keySet().forEach(tp -> this.callbacks.put(tp, this.callbackForThread.get()));
    }

    @Override
    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
        partitions.forEach(tp -> this.callbacks.remove(tp));
        this.callbackForThread.remove();
    }

    @Override
    public void onIdleContainer(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
    }

    @KafkaListener(id = "seekExample", topics = "seekExample", concurrency = "3")
    public void listen(ConsumerRecord<String, String> in) {
        logger.info(in.toString());
    }

    public void seekToStart() {
        this.callbacks.forEach((tp, callback) -> callback.seekToBeginning(tp.topic(), tp.partition()));
    }

}
----

To make things simpler, version 2.3 added the `AbstractConsumerSeekAware` class, which keeps track of which callback is to be used for a topic/partition.
The following example shows how to seek to the last record processed, in each partition, each time the container goes idle.
It also has methods that allow arbitrary external calls to rewind partitions by one record.

[source, java]
----
public class SeekToLastOnIdleListener extends AbstractConsumerSeekAware {

    @KafkaListener(id = "seekOnIdle", topics = "seekOnIdle")
    public void listen(String in) {
        ...
    }

    @Override
    public void onIdleContainer(Map<TopicPartition, Long> assignments,
            ConsumerSeekCallback callback) {

            assignments.keySet().forEach(tp -> callback.seekRelative(tp.topic(), tp.partition(), -1, true));
    }

    /**
    * Rewind all partitions one record.
    */
    public void rewindAllOneRecord() {
        getSeekCallbacks()
            .forEach((tp, callback) ->
                callback.seekRelative(tp.topic(), tp.partition(), -1, true));
    }

    /**
    * Rewind one partition one record.
    */
    public void rewindOnePartitionOneRecord(String topic, int partition) {
        getSeekCallbackFor(new TopicPartition(topic, partition))
            .seekRelative(topic, partition, -1, true);
    }

}
----

Version 2.6 added convenience methods to the abstract class:

* `seekToBeginning()` - seeks all assigned partitions to the beginning.
* `seekToEnd()` - seeks all assigned partitions to the end.
* `seekToTimestamp(long timestamp)` - seeks all assigned partitions to the offset represented by that timestamp.

Example:

[source, java]
----
public class MyListener extends AbstractConsumerSeekAware {

    @KafkaListener(...)
    void listn(...) {
        ...
    }
}

public class SomeOtherBean {

    MyListener listener;

    ...

    void someMethod() {
        this.listener.seekToTimestamp(System.currentTimeMillis() - 60_000);
    }

}

----

As of version 3.3, a new method `getGroupId()` was introduced in the `ConsumerSeekAware.ConsumerSeekCallback` interface.
This method is particularly useful when you need to identify the consumer group associated with a specific seek callback.

NOTE: When using a class that extends `AbstractConsumerSeekAware`, a seek operation performed in one listener may impact all listeners in the same class.
This might not always be the desired behavior.
To address this, you can use the `getGroupId()` method provided by the callback.
This allows you to perform seek operations selectively, targeting only the consumer group of interest.

